package cn._51doit.live.udf;

import cn._51doit.live.pojo.OrderMain;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

/**
 * 1.数据的转换
 * 2.数据过滤
 * 3.将数据打平
 */
public class JsonToOrderMainFunction extends ProcessFunction<String, OrderMain> {

    @Override
    public void processElement(String value, Context ctx, Collector<OrderMain> out) throws Exception {

        try {
            JSONObject jsonObject = JSON.parseObject(value);
            JSONArray jsonArray = jsonObject.getJSONArray("data");
            String type = jsonObject.getString("type");
            for (int i = 0; i < jsonArray.size(); i++) {
                OrderMain orderMain = jsonArray.getObject(i, OrderMain.class);
                orderMain.setType(type);
                //将数据输出
                out.collect(orderMain);
            }
        } catch (Exception e) {
            //记录日志
        }

    }

}
